草庐IT

c# - 在基于 Rx 计数的聚合中,将计数重置为超过最大时间间隔

无时间限制的基于计数的过滤IObservablefilteredStream=changes.Buffer(3);如何引入闲置重置?但是如何引入超时TimeSpantooLong以便在两个值之间的间隔超过此最大值时从零重新开始计数? 最佳答案 我认为这就是您所追求的。varlongGap=source.Throttle(tooLong);varfiltered=source.Window(()=>{returnlongGap;})//GivesawindowbetweeneverylongGap.Select(io=>io.Buff

c# - Rx 运算符到不同的序列

重要:有关结果的描述和更多详细信息,请同时查看我的回答我需要对通常被复制的一系列对象/事件进行分组和过滤,并使用TimeSpan间隔对它们进行缓冲。我尝试用一​​些大理石图更好地解释它:X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z会产生X---Y---Z---X---Y---Z其中X、Y和Z是不同的事件类型,'---'表示间隔。此外,我还想通过一个关键属性来区分它在所有类型上都可用,因为它们有一个共同的基类:X,Y,Z:A并且A包含一个属性Key。使用符号X.a表示X.Key=a,最终示例将是:X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b

c# - 可观察到 Rx 中的回调

我正在寻找一种优雅的方式来使用Rx从一个普通的回调委托(delegate)创建一个Observable,类似于Observable.FromEventPattern?说,我正在包装Win32EnumWindows回调我提供的EnumWindowsProc的API。我知道我可以为这个回调创建一个临时的C#事件适配器并将它传递给FromEventPattern。此外,我可能可以手动实现IObservable,因此它会从我的EnumWindowsProc回调中调用IObserver.OnNext。是否存在我缺少的用于在Rx中包装回调的现有模式? 最佳答案

c# - RX 和异常处理

我正在为一种进程中的消息总线使用响应式扩展。实现非常简单。注册的样子publicIDisposableRegister(Actionaction)whereT:IMessage{returnthis.subject.OfType().Subscribe(action);}然后简单地发送:privatevoidSendMessage(IMessagemessage){this.subject.OnNext(message);}但是我现在在RX的异常行为方面遇到了一些麻烦。一个异常在注册/订阅的操作中被抛出——Observable的“流”被破坏并且将不再订阅。由于此消息总线用于应用程序的两

c# - 当数据流的速度快于订阅者可以消耗的速度时,Rx 会如何表现?

我对在生产应用程序中使用Rx感到非常兴奋;我将在其中收听来自不同channel的传入通知更新。我将在此流之上编写Rx查询,我将在其中使用.Window()运算符进行节流。订阅者(在我的例子中是ActionBlock)将以阻塞方式处理这些数据;(即它不会从ActionBlock生成任务)。请记住,如果数据的速度比我的订阅者可以消耗的速度快得多,那么传入数据会发生什么。Rx查询是否在内部使用任何缓冲区;它会溢出吗? 最佳答案 您所指的现象称为背压,Rx团队目前正在探索处理这种情况的不同方法。一种解决方案可能是将背压反馈给Observab

c# - Reactive Extensions (Rx) 可以跨进程或机器边界使用吗?

依稀记得很久以前看到过一些关于这个的讨论,但从那以后就再也没有听到过任何消息。那么基本上您可以在远程机器上订阅IObservable吗? 最佳答案 您可以使用IObservable.Remotable通过.NETRemoting直接从其他机器使用可观察对象。 关于c#-ReactiveExtensions(Rx)可以跨进程或机器边界使用吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questi

c# - 使用 Rx( react 性扩展)观察 ObservableCollection 中的特定项目

我有一个ObservableCollection,我需要为特定项目引用它。如果该项目不存在,我需要通过ReactiveExtensions监控它是否/何时出现该项目,但在设置声明时需要一些帮助。我仍然不熟悉所有不同的Linq扩展是如何工作的,所以我不确定该怎么做。谁能指出我正确的方向?为了更好地说明,我需要像下面这样的东西:publicclassmyitem:INotifyPropertyChanged{privatestring_key;privatestring_value;publicstringkey{get{return_key;}set{_key=value;NotifyP

c# - 当没有值传入时,是否有 Rx 方法定期重复以前的值?

我遇到过的一个用例,我怀疑我不是唯一的一个,是一个像这样的方法:IObservableObservable.RepeatLastValueDuringSilence(thisIObservableinner,TimeSpanmaxQuietPeriod);这将从内部可观察对象返回所有future的项目,而且,如果内部可观察对象在一段时间内(maxQuietPeriod)没有调用OnNext,它只会重复最后一个值(当然直到内部调用OnCompleted或OnError)。一个合理的理由是服务定期ping出定期状态更新。例如:varmyStatus=Observable.FromEvent

c# - 使用 RX 的最佳实践——返回一个 Observable 还是接受一个 Observer?

使用ReactiveExtensions,我可以想出多种方法来模拟具有副作用/IO的操作-比如从聊天室订阅消息。我可以接受参数(比如聊天室)和一个Observer,返回一个Disposable,即DisposableSubscribeTo(stringchatRoom,Observerobserver)或者在给定参数的情况下返回一个Observable,即ObservableGetObservableFor(stringchatRoom)当返回一个Observable时,我还可以选择将其设置为“热”或“冷”,即在调用我的方法时或在订阅observable时执行实际订阅。此外,我可以使o

c# - 如何避免在 RX 中使用 Subjects

所以我一直在到处阅读Subject的用法是“坏的”——我有点同意这个推理。但是,我正在尝试想出避免使用它的最佳方法并举一个例子。目前我有一个抽象类用于我的持久化配置类,它有一个protectedSave()每当更改属性时调用它的方法应该持久化该类。此消息将消息发送到Subject通过IObservable暴露序列化服务监听和序列化类的接口(interface)。这在当时看来是最明显、最简单、最快捷的实现方式。那么不使用Subject的RX方法是什么?我会改为公开事件并使用Observable.FromEventPattern()吗?订阅它?-因为这似乎是一种更复杂的方法。